Cloud Composer で Storage Transfer Service の転送ジョブのステータスを監視したい

Cloud Composer で Storage Transfer Service の転送ジョブのステータスを監視したい

Clock Icon2024.10.13

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

やりたいこと

Cloud Composer上で動作する Airflow DAG で、Storage Transfer Service の転送ジョブの完了を待機する仕組みを検討しています。
具体的には、Google Cloud Storage に転送されたファイルを BigQuery に取り込むために、転送ジョブが完了するのを待つ必要があります。

今回は、CloudDataTransferServiceJobStatusSensorを使用して、DAG から転送ジョブの完了を待機できるか試してみましたのでご紹介します。

CloudDataTransferServiceJobStatusSensor とは

CloudDataTransferServiceJobStatusSensorは、指定した Storage Transfer Service の転送ジョブのステータスを監視するためのセンサーです。
このセンサーを用いることで、特定のジョブが完了したかどうかを確認し、その結果に応じて次のタスクを実行することができます。

ただし、実行中ジョブの完了を待機するのではなく、ジョブに属する少なくとも1つの操作が期待どおりのステータスになるまで待機するセンサーであることに注意が必要です
(私は当初この点を誤解しており、実行中ジョブの完了を待機するセンサーを想定しておりました)

環境作成

DAG を動かすために Cloud Composer 環境を作成します。
今回は最新バージョンの Composer 3 を選択しました。

cloud-composer-storage-transfer-service-status-sensor_01

Google Cloud コンソールの「Cloud Composer の環境作成」ページから、test-composerという名前の環境を東京リージョンで作成しました。

設定はデフォルトのままとしていますが、サービスアカウントやネットワーク設定は必要に応じて調整してください。

cloud-composer-storage-transfer-service-status-sensor_02

cloud-composer-storage-transfer-service-status-sensor_03

ファイル準備

Storage Transfer Service で転送するファイルを準備します。
今回はcm_enokawa_work_sourceという名前のバケットに、sales_で始まる日別のCSV形式ファイルをアップロードしました。
このファイルが Storage Transfer Service のジョブにより転送され、最終的に BigQuery に取り込まれます。

cloud-composer-storage-transfer-service-status-sensor_04

DAGを作成する

まず、Google Cloud Storageに あるファイルを BigQuery に取り込むための DAG を作成します。

gcs_transfer_to_bq.py
from airflow import DAG
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.utils.dates import days_ago

PROJECT_ID = '{プロジェクトID}'
DATASET_ID = 'work'
TABLE_ID = 'sales_data'
BUCKET_NAME = 'cm_enokawa_work'

# DAGの基本設定
default_args = {
    'start_date': days_ago(1),
    'retries': 0,  # リトライなし
}

with DAG(
    dag_id='gcs_transfer_to_bq',
    default_args=default_args,
    schedule_interval=None,  # 手動で実行する
    catchup=False
) as dag:

    # GCS のファイルリストを取得
    list_gcs_files = GCSListObjectsOperator(
        task_id='list_gcs_files',
        bucket=BUCKET_NAME,
        prefix='sales_',  # 'sales_' で始まるファイルを取得
    )

    # 取得したファイルリストを使って BigQuery にロード
    gcs_to_bq = GCSToBigQueryOperator(
        task_id='load_gcs_to_bq',
        bucket=BUCKET_NAME,  # GCS バケット名
        source_objects=list_gcs_files.output,  # 取得したファイルリストを使う
        destination_project_dataset_table=f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}',  # BigQuery のデータセットとテーブル
        source_format='CSV',  # ソースファイル形式
        write_disposition='WRITE_TRUNCATE',  # 既存データの上書き
        skip_leading_rows=1  # CSV のヘッダー行をスキップ
    )

    list_gcs_files >> gcs_to_bq

この DAG では、Google Cloud Storage に保存された CSV ファイル (sales.csv) を BigQuery のテーブル(sales_data)にインポートしています。

cloud-composer-storage-transfer-service-status-sensor_07

次に、Storage Transfer Service の転送ジョブの完了を待機する DAG を作成します。
以下は、先ほどの DAG にCloudDataTransferServiceJobStatusSensorを追加したものです。

gcs_transfer_to_bq_with_status_sensor.py
from airflow import DAG
from airflow.providers.google.cloud.sensors.cloud_storage_transfer_service import CloudDataTransferServiceJobStatusSensor
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.utils.dates import days_ago

PROJECT_ID = '{プロジェクトID}'
DATASET_ID = 'work'
TABLE_ID = 'sales_data'
BUCKET_NAME = 'cm_enokawa_work'
TRANSFER_JOB_ID = 'transferJobs/5129259777506586129'  # 転送ジョブのID

# DAGの基本設定
default_args = {
    'start_date': days_ago(1),
    'retries': 0,  # リトライなし
}

with DAG(
    dag_id='gcs_transfer_to_bq_with_status_sensor',
    default_args=default_args,
    schedule_interval=None,  # 手動で実行する
    catchup=False
) as dag:

    # CloudDataTransferServiceJobStatusSensor を使って転送ジョブの完了を待機
    transfer_job_status_task = CloudDataTransferServiceJobStatusSensor(
        task_id='check_transfer_job_status',
        job_name=TRANSFER_JOB_ID,  # 転送ジョブのID
        project_id=PROJECT_ID,  # GCP プロジェクトID
        expected_statuses=['SUCCESS'],  # ジョブが成功ステータスを監視
        poke_interval=60,  # 60秒ごとにステータスをポーリング
        timeout=60 * 10    # 10分以内に完了しなければタイムアウト
    )

    # GCS のファイルリストを取得
    list_gcs_files = GCSListObjectsOperator(
        task_id='list_gcs_files',
        bucket=BUCKET_NAME,
        prefix='sales_',  # 'sales_' で始まるファイルを取得
    )

    # 取得したファイルリストを使って BigQuery にロード
    gcs_to_bq = GCSToBigQueryOperator(
        task_id='load_gcs_to_bq',
        bucket=BUCKET_NAME,  # GCS バケット名
        source_objects=list_gcs_files.output,  # 取得したファイルリストを使う
        destination_project_dataset_table=f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}',  # BigQuery のデータセットとテーブル
        source_format='CSV',  # ソースファイル形式
        write_disposition='WRITE_TRUNCATE',  # 既存データの上書き
        skip_leading_rows=1  # CSV のヘッダー行をスキップ
    )

    transfer_job_status_task >> list_gcs_files >> gcs_to_bq

この DAG では、transfer_job_status_taskで Storage Transfer Service の転送ジョブがSUCCESSになるまで待機します。
timeoutパタメータを指定して、10分以内 に完了しなければタイムアウトするようにしています。

監視対象となる転送ジョブは、転送ジョブIDで指定する必要があるため、事前に転送ジョブを作成してジョブIDを確認しておく必要があります。

cloud-composer-storage-transfer-service-status-sensor_09

転送ジョブを一時停止する

DAG を実行する前に、監視対象となる転送ジョブを実行中の状態にします。
具体的には、事前に作成した転送ジョブを開始後に一時停止します。

cloud-composer-storage-transfer-service-status-sensor_05

cloud-composer-storage-transfer-service-status-sensor_06

DAG を実行する

Cloud Composer の DAG 実行画面から、先ほど作成した DAG を手動で実行します。

まずは、CloudDataTransferServiceJobStatusSensorを使用していないDAG(gcs_transfer_to_bq)を実行してみます。

cloud-composer-storage-transfer-service-status-sensor_08

監視対象の転送ジョブは一時停止ステータスのままですが、DAG は正常終了しました。
DAG 側では転送ジョブのステータスを感知していないので、当然の結果となります。

次に、CloudDataTransferServiceJobStatusSensorを使用した DAG(gcs_transfer_to_bq_with_status_sensor)を実行します。

cloud-composer-storage-transfer-service-status-sensor_10

transfer_job_status_taskが実行中となり、転送ジョブの完了を待機します。

転送ジョブを再開する

転送ジョブを再開し、DAG がどのように動作するかを確認します。
一時停止中の転送ジョブを再開します。

cloud-composer-storage-transfer-service-status-sensor_11

転送ジョブが成功ステータスで完了しました。

cloud-composer-storage-transfer-service-status-sensor_12

DAG を確認すると、正常終了していました。

cloud-composer-storage-transfer-service-status-sensor_13

想定通り、監視対象の転送ジョブが完了するのを待機する動きになっているようです。
ガントチャートでも、transfer_job_status_taskが待機している様子が確認できます。

cloud-composer-storage-transfer-service-status-sensor_14

タイムアウトさせてみる

最後に、CloudDataTransferServiceJobStatusSensorがタイムアウトしたときの動作を確認してみます。

先ほどと同じように、転送ジョブを一時停止させた状態で DAG を実行します。
ただし、今回はタイムアウトさせるために 10分以上 一時停止させた状態にしておきます。

DAG を動かしてみると、すぐに正常終了してしまいました。
transfer_job_status_taskCloudDataTransferServiceJobStatusSensor)が実行中の状態のまま待機する動きを想定していましたが、想定とは異なる結果となりました。

cloud-composer-storage-transfer-service-status-sensor_15

改めてドキュメントを確認すると、以下の記載がありました。

Waits for at least one operation belonging to the job to have the expected status.

どうやら、
現在実行中の転送ジョブが期待通りのステータスになるまで待機する
のではなく、
ジョブに属する少なくとも 1 つの操作が期待どおりのステータスになるまで待機する
という仕様のようです。


ということで、新たに転送ジョブを作成し、監視対象のジョブIDを作成した転送ジョブのものに差し替えて、同じように動作確認してみました。

すると、transfer_job_status_taskの実行開始後 10分 経過するとタイムアウトとなり、DAG が失敗しました。

cloud-composer-storage-transfer-service-status-sensor_16

ガントチャートでも、 10分間 待機している様子が確認できます。

cloud-composer-storage-transfer-service-status-sensor_17


さらに、転送ジョブのステータスを変えながら、CloudDataTransferServiceJobStatusSensorの挙動を確認してみました。

以下は、転送ジョブのステータスとCloudDataTransferServiceJobStatusSensorが待機するか否かの関係を整理した表です。

転送ジョブのステータス 待機する / 待機しない
未実行 待機する
実行中 待機する
成功 待機しない
失敗 待機する

ドキュメントに記載の通り、成功ステータスが現れるまで待機する挙動になっていることが分かります。

まとめ

以上、Cloud Composer で Storage Transfer Service の転送ジョブのステータスを監視する方法を試してみました。

CloudDataTransferServiceJobStatusSensorを活用することで、転送ジョブが成功するまで待機する仕組みを構築できることが確認できました。

ただし、このセンサーはジョブに属する少なくとも1つの操作が期待どおりのステータスになるまで待機する仕様であるため、DAG内で作成・実行した転送ジョブを監視するシーンに特に適しています。
具体的には、下記のオペレーターと組み合わせることで、より柔軟で効率的なデータ転送ワークフローを実現できます。

  • CloudDataTransferServiceCreateJobOperator:新しい転送ジョブを作成するオペレーター
  • CloudDataTransferServiceRunJobOperator:既存の転送ジョブを実行するオペレーター

ちなみに、CloudDataTransferServiceJobStatusSensorを使用したタスク(今回のケースではtransfer_job_status_task)の詳細画面の [Cloud Storage Transfer Job] ボタンから対象の 転送ジョブのコンソール画面に遷移することができます。便利!

cloud-composer-storage-transfer-service-status-sensor_21

参考

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.